/*!
 * Copyright (c) Microsoft Corporation and contributors. All rights reserved.
 * Licensed under the MIT License.
 */

import { strict as assert } from "node:assert";

import { generatePairwiseOptions } from "@fluid-private/test-pairwise-generator";
import type { IBatchMessage } from "@fluidframework/container-definitions/internal";
import {
	MessageType,
	type ISequencedDocumentMessage,
} from "@fluidframework/driver-definitions/internal";
import { MockLogger } from "@fluidframework/telemetry-utils/internal";

import { ContainerMessageType } from "../../index.js";
import type {
	InboundContainerRuntimeMessage,
	InboundSequencedContainerRuntimeMessage,
	LocalContainerRuntimeMessage,
} from "../../messageTypes.js";
import {
	BatchManager,
	type OutboundBatchMessage,
	type OutboundBatch,
	type OutboundSingletonBatch,
	type BatchStartInfo,
	ensureContentsDeserialized,
	type InboundMessageResult,
	OpCompressor,
	OpDecompressor,
	OpGroupingManager,
	OpSplitter,
	RemoteMessageProcessor,
} from "../../opLifecycle/index.js";

import { compressMultipleMessageBatch } from "./legacyCompression.js";

function isSingletonBatch(batch: OutboundBatch): batch is OutboundSingletonBatch {
	return batch.messages.length === 1;
}

// Make a mock op with distinguishable contents
function op(data: string): LocalContainerRuntimeMessage {
	// eslint-disable-next-line @typescript-eslint/consistent-type-assertions
	return {
		type: ContainerMessageType.FluidDataStoreOp,
		contents: data as unknown, // This is mock contents
	} as LocalContainerRuntimeMessage;
}

describe("RemoteMessageProcessor", () => {
	function getMessageProcessor(): RemoteMessageProcessor {
		const logger = new MockLogger();
		return new RemoteMessageProcessor(
			new OpSplitter([], undefined, 1, 1, logger),
			new OpDecompressor(logger),
			new OpGroupingManager(
				{
					groupedBatchingEnabled: true,
				},
				logger,
			),
		);
	}

	function getOutboundMessage(value: string, batchMetadata?: boolean): OutboundBatchMessage {
		return {
			metadata:
				batchMetadata === undefined
					? undefined
					: {
							batch: batchMetadata,
						},
			referenceSequenceNumber: Number.POSITIVE_INFINITY,
			contents: JSON.stringify({
				contents: {
					key: value,
				},
				type: ContainerMessageType.FluidDataStoreOp,
			}),
		};
	}

	function getProcessedMessage(
		value: string,
		seqNum: number,
		clientSeqNum: number,
		batchMetadata?: boolean,
	): ISequencedDocumentMessage {
		// eslint-disable-next-line @typescript-eslint/consistent-type-assertions
		return {
			type: ContainerMessageType.FluidDataStoreOp,
			metadata:
				batchMetadata === undefined
					? undefined
					: {
							batch: batchMetadata,
						},
			compression: undefined,
			sequenceNumber: seqNum,
			clientSequenceNumber: clientSeqNum,
			referenceSequenceNumber: Number.POSITIVE_INFINITY,
			contents: {
				key: value,
			},
		} as ISequencedDocumentMessage;
	}

	const messageGenerationOptions = generatePairwiseOptions<{
		// chunking cannot happen without compression
		compressionAndChunking:
			| {
					compression: false;
					chunking: false;
			  }
			| {
					compression: true;
					chunking: boolean;
			  };
		grouping: boolean;
	}>({
		compressionAndChunking: [
			{ compression: false, chunking: false },
			{ compression: true, chunking: false },
			{ compression: true, chunking: true },
		],
		grouping: [true, false],
	});

	for (const option of messageGenerationOptions) {
		it(`Correctly processes single batch: compression [${option.compressionAndChunking.compression}] chunking [${option.compressionAndChunking.chunking}] grouping [${option.grouping}]`, () => {
			let batch: OutboundBatch = {
				contentSizeInBytes: 1,
				referenceSequenceNumber: Number.POSITIVE_INFINITY,
				messages: [
					getOutboundMessage("a", true),
					getOutboundMessage("b"),
					getOutboundMessage("c"),
					getOutboundMessage("d"),
					getOutboundMessage("e", false),
				],
			};

			const mockLogger = new MockLogger();
			if (option.grouping) {
				const groupingManager = new OpGroupingManager(
					{
						groupedBatchingEnabled: true,
					},
					mockLogger,
				);
				batch = groupingManager.groupBatch(batch);
			}

			let leadingChunkCount = 0;
			const outboundMessages: IBatchMessage[] = [];
			if (option.compressionAndChunking.compression) {
				if (isSingletonBatch(batch)) {
					const compressor = new OpCompressor(mockLogger);
					batch = compressor.compressBatch(batch);
				} else {
					batch = compressMultipleMessageBatch(batch);
				}

				if (option.compressionAndChunking.chunking) {
					const splitter = new OpSplitter(
						[],
						(messages: IBatchMessage[], refSeqNum?: number) => {
							++leadingChunkCount;
							outboundMessages.push(...messages);
							return 0;
						},
						2,
						Number.POSITIVE_INFINITY,
						mockLogger,
					);
					// NOTE: This function still supports batches with empty placeholder ops, we just need to cast to use it.
					batch = splitter.splitSingletonBatchMessage(batch as OutboundSingletonBatch);
				}
			}
			let startSeqNum = outboundMessages.length + 1;
			outboundMessages.push(...batch.messages);

			const messageProcessor = getMessageProcessor();
			let batchStart: BatchStartInfo | undefined;
			const inboundMessages: InboundSequencedContainerRuntimeMessage[] = [];
			let seqNum = 1;
			for (const message of outboundMessages) {
				// eslint-disable-next-line @typescript-eslint/consistent-type-assertions
				const inboundMessage = {
					type: MessageType.Operation,
					contents: message.contents,
					metadata: message.metadata,
					compression: message.compression,
					sequenceNumber: seqNum,
					clientSequenceNumber: seqNum++,
					referenceSequenceNumber: message.referenceSequenceNumber,
				} as ISequencedDocumentMessage;

				ensureContentsDeserialized(inboundMessage);
				const result = messageProcessor.process(inboundMessage, () => {});
				switch (result?.type) {
					case "fullBatch": {
						assert(
							option.compressionAndChunking.chunking || outboundMessages.length === 1,
							"Apart from chunking, expected fullBatch for single-message batch only (includes Grouped Batches)",
						);
						batchStart = result.batchStart;
						inboundMessages.push(...result.messages);
						break;
					}
					case "batchStartingMessage": {
						batchStart = result.batchStart;
						inboundMessages.push(result.nextMessage);
						break;
					}
					case "nextBatchMessage": {
						assert(
							batchStart !== undefined,
							"batchStart should have been set from a prior message",
						);
						inboundMessages.push(result.nextMessage);
						break;
					}
					default: {
						// These are leading chunks
						assert(result === undefined, "unexpected result type");
						assert(
							option.compressionAndChunking.chunking,
							"undefined result only expected with chunking",
						);
						break;
					}
				}
			}

			const expected = option.grouping
				? [
						getProcessedMessage("a", startSeqNum, 1, true),
						getProcessedMessage("b", startSeqNum, 2),
						getProcessedMessage("c", startSeqNum, 3),
						getProcessedMessage("d", startSeqNum, 4),
						getProcessedMessage("e", startSeqNum, 5, false),
					]
				: [
						getProcessedMessage("a", startSeqNum, startSeqNum++, true),
						getProcessedMessage("b", startSeqNum, startSeqNum++),
						getProcessedMessage("c", startSeqNum, startSeqNum++),
						getProcessedMessage("d", startSeqNum, startSeqNum++),
						getProcessedMessage("e", startSeqNum, startSeqNum, false),
					];

			assert.deepStrictEqual(inboundMessages, expected, "unexpected output");
			assert.equal(
				batchStart?.batchStartCsn,
				leadingChunkCount + 1,
				"unexpected batchStartCsn",
			);
		});
	}

	it("Processes multiple batches (No Grouped Batching)", () => {
		// Define the test messages grouped by batch
		type TestMessageInput = Pick<
			ISequencedDocumentMessage,
			"clientId" | "clientSequenceNumber" | "type" | "contents" | "metadata" | "compression"
		> & { contents: InboundContainerRuntimeMessage };
		interface TestBatchInput {
			messages: TestMessageInput[];
			batchId?: string; // For flush call when resubmitting
		}
		let csn = 0;
		// biome-ignore format: Easier to digest similarities/differences with single lines
		const testBatchesInput: TestBatchInput[] = [
			{ messages: [
				{ contents: op("A1"), clientId: "CLIENT_ID", clientSequenceNumber: ++csn, type: MessageType.Operation, metadata: { batch: true } },
				{ contents: op("A2"), clientId: "CLIENT_ID", clientSequenceNumber: ++csn, type: MessageType.Operation },
				{ contents: op("A3"), clientId: "CLIENT_ID", clientSequenceNumber: ++csn, type: MessageType.Operation, metadata: { batch: false } }
			] },
			{ messages: [
				{ contents: op("B1"), clientId: "CLIENT_ID", clientSequenceNumber: ++csn, type: MessageType.Operation }
			] },
			{ messages: [
				{ contents: op("C1"), clientId: "CLIENT_ID", clientSequenceNumber: ++csn, type: MessageType.Operation, metadata: { batch: true, batchId: "C" } },
				{ contents: op("C2"), clientId: "CLIENT_ID", clientSequenceNumber: ++csn, type: MessageType.Operation, metadata: { batch: false } }
			], batchId: "C" },
			{ messages: [
				{ contents: op("D1"), clientId: "CLIENT_ID", clientSequenceNumber: ++csn, type: MessageType.Operation, metadata: { batchId: "D" } }
			], batchId: "D" },
		];
		const unpackBatchMessage = (
			batchIndex: number,
			messageIndex: number,
		): InboundSequencedContainerRuntimeMessage => {
			const message = testBatchesInput[batchIndex].messages[messageIndex];
			// eslint-disable-next-line @typescript-eslint/consistent-type-assertions
			return {
				...message,
				...message.contents,
			} as Partial<InboundSequencedContainerRuntimeMessage> as InboundSequencedContainerRuntimeMessage;
		};
		// biome-ignore format: Easier to read leaving batchStart as single lines
		const expectedResults: InboundMessageResult[] = [
			{
				type: "batchStartingMessage",
				nextMessage: unpackBatchMessage(0, 0),
				batchStart: { batchId: undefined, clientId: "CLIENT_ID", batchStartCsn: 1, keyMessage: unpackBatchMessage(0, 0) },
			},
			{
				type: "nextBatchMessage",
				nextMessage: unpackBatchMessage(0, 1),
				batchEnd: false,
			},
			{
				type: "nextBatchMessage",
				nextMessage: unpackBatchMessage(0, 2),
				batchEnd: true,
			},
			{
				type: "fullBatch",
				messages: [unpackBatchMessage(1, 0)],
				batchStart: { batchId: undefined, clientId: "CLIENT_ID", batchStartCsn: 4, keyMessage: unpackBatchMessage(1, 0) },
				groupedBatch: false,
				length: 1,
			},
			{
				type: "batchStartingMessage",
				nextMessage: unpackBatchMessage(2, 0),
				batchStart: { batchId: "C", clientId: "CLIENT_ID", batchStartCsn: 5, keyMessage: unpackBatchMessage(2, 0) },
			},
			{
				type: "nextBatchMessage",
				nextMessage: unpackBatchMessage(2, 1),
				batchEnd: true,
			},
			{
				type: "fullBatch",
				messages: [unpackBatchMessage(3, 0)],
				batchStart: { batchId: "D", clientId: "CLIENT_ID", batchStartCsn: 7, keyMessage: unpackBatchMessage(3, 0) },
				groupedBatch: false,
				length: 1,
			},
		];

		const processor = getMessageProcessor();
		const processResults = testBatchesInput.flatMap(({ messages }) =>
			messages.map((message) => {
				return processor.process(message as ISequencedDocumentMessage, () => {});
			}),
		);

		assert.deepStrictEqual(processResults, expectedResults, "unexpected output from process");
	});

	describe("Throws on invalid batches", () => {
		it("Unexpected batch start marker mid-batch", () => {
			let csn = 1;
			const batchManager = new BatchManager({
				canRebase: false,
			});
			batchManager.push(
				{ runtimeOp: op("A1"), referenceSequenceNumber: 1 },
				false /* reentrant */,
			);
			batchManager.push(
				{ runtimeOp: op("A2"), referenceSequenceNumber: 1 },
				false /* reentrant */,
			);
			batchManager.push(
				{ runtimeOp: op("A3"), referenceSequenceNumber: 1 },
				false /* reentrant */,
			);
			const batchA = batchManager.popBatch();
			batchA.messages[2].metadata = undefined; // Wipe out the ending metadata so the next batch's start shows up mid-batch
			batchManager.push(
				{ runtimeOp: op("B1"), referenceSequenceNumber: 1 },
				false /* reentrant */,
			);
			batchManager.push(
				{ runtimeOp: op("B2"), referenceSequenceNumber: 1 },
				false /* reentrant */,
			);
			const batchB = batchManager.popBatch();

			const processor = getMessageProcessor();

			// Add clientId and CSN as would happen on final stage of submit
			const inboundMessages: ISequencedDocumentMessage[] = [
				...batchA.messages,
				...batchB.messages,
			].map(
				({ metadata }) =>
					({
						metadata,
						clientId: "CLIENT_ID",
						clientSequenceNumber: csn++,
					}) satisfies Partial<ISequencedDocumentMessage> as ISequencedDocumentMessage,
			);

			assert.throws(
				() => {
					inboundMessages.map((message) => processor.process(message, () => {}));
				},
				(e: Error) => {
					return e.message === "0x9d6";
				},
				"unexpected batch end marker should trigger assert",
			);
		});

		it("Unexpected batch end marker when no batch has started", () => {
			let csn = 1;
			const batchManager = new BatchManager({
				canRebase: false,
			});
			batchManager.push(
				{ runtimeOp: op("A1"), referenceSequenceNumber: 1 },
				false /* reentrant */,
			);
			batchManager.push(
				{ runtimeOp: op("A2"), referenceSequenceNumber: 1 },
				false /* reentrant */,
			);
			batchManager.push(
				{ runtimeOp: op("A3"), referenceSequenceNumber: 1 },
				false /* reentrant */,
			);
			const batchA = batchManager.popBatch();
			batchA.messages[0].metadata = undefined; // Wipe out the starting metadata

			const processor = getMessageProcessor();

			// Add clientId and CSN as would happen on final stage of submit
			const inboundMessages: ISequencedDocumentMessage[] = [
				...batchA.messages,
			].map<ISequencedDocumentMessage>(
				({ metadata }) =>
					({
						metadata,
						clientId: "CLIENT_ID",
						clientSequenceNumber: csn++,
					}) satisfies Partial<ISequencedDocumentMessage> as ISequencedDocumentMessage,
			);

			assert.throws(
				() => {
					inboundMessages.map((message) => processor.process(message, () => {}));
				},
				(e: Error) => {
					return e.message === "0x9d5";
				},
				"unexpected batch start marker should trigger assert",
			);
		});
	});

	it("Processes legacy string-content message", () => {
		const messageProcessor = getMessageProcessor();
		const contents = {
			contents: { key: "value" },
			type: ContainerMessageType.FluidDataStoreOp,
		};
		const message = {
			contents: JSON.stringify(contents),
			clientId: "clientId",
			type: MessageType.Operation,
			metadata: { meta: "data" },
		};
		const documentMessage = message as ISequencedDocumentMessage;
		ensureContentsDeserialized(documentMessage);
		const processResult = messageProcessor.process(documentMessage, () => {});

		assert.equal(
			processResult?.type,
			"fullBatch",
			"Single message should yield a 'fullBatch' result",
		);
		assert.strictEqual(processResult.length, 1, "only expected a single processed message");
		const [inboundMessage] = processResult.messages;

		assert.deepStrictEqual(inboundMessage.contents, contents.contents);
		assert.deepStrictEqual(inboundMessage.type, contents.type);
	});

	it("Don't unpack non-datastore messages", () => {
		const messageProcessor = getMessageProcessor();
		const message = {
			contents: { key: "value" },
			clientId: "clientId",
			type: MessageType.Summarize,
			metadata: { meta: "data" },
		};
		const documentMessage = message as ISequencedDocumentMessage;
		const processResult = messageProcessor.process(documentMessage, () => {});

		assert.equal(
			processResult?.type,
			"fullBatch",
			"Single message should yield a 'fullBatch' result",
		);
		assert.strictEqual(processResult.length, 1, "only expected a single processed message");
		const [inboundMessage] = processResult.messages;

		assert.deepStrictEqual(inboundMessage.contents, message.contents);
		assert.deepStrictEqual(inboundMessage.type, message.type);
	});

	it("Processing groupedBatch works as expected", () => {
		const groupedBatch = {
			type: MessageType.Operation,
			sequenceNumber: 10,
			clientSequenceNumber: 12,
			clientId: "CLIENT_ID",
			metadata: {
				batchId: "BATCH_ID",
			},
			contents: {
				type: OpGroupingManager.groupedBatchOp,
				contents: [
					{
						metadata: { batch: true, batchId: "BATCH_ID" },
						contents: {
							type: ContainerMessageType.FluidDataStoreOp,
							contents: {
								contents: "a",
							},
						},
					},
					{
						metadata: { batch: false },
						contents: {
							type: ContainerMessageType.FluidDataStoreOp,
							contents: {
								contents: "b",
							},
						},
					},
				],
			},
		};
		const messageProcessor = getMessageProcessor();
		const processResult = messageProcessor.process(
			groupedBatch as ISequencedDocumentMessage,
			() => {},
		);

		const expected = [
			{
				type: ContainerMessageType.FluidDataStoreOp,
				clientId: "CLIENT_ID",
				sequenceNumber: 10,
				clientSequenceNumber: 1,
				compression: undefined,
				metadata: { batch: true, batchId: "BATCH_ID" },
				contents: {
					contents: "a",
				},
			},
			{
				type: ContainerMessageType.FluidDataStoreOp,
				clientId: "CLIENT_ID",
				sequenceNumber: 10,
				clientSequenceNumber: 2,
				compression: undefined,
				metadata: { batch: false },
				contents: {
					contents: "b",
				},
			},
		];
		assert.deepStrictEqual(
			processResult,
			{
				type: "fullBatch",
				messages: expected,
				batchStart: {
					batchStartCsn: 12,
					clientId: "CLIENT_ID",
					batchId: "BATCH_ID",
					keyMessage: expected[0],
				},
				groupedBatch: true,
				length: 2,
			},
			"unexpected processing of groupedBatch",
		);
	});

	it("Processing empty groupedBatch works as expected", () => {
		const groupedBatch = {
			type: MessageType.Operation,
			sequenceNumber: 10,
			clientSequenceNumber: 8,
			clientId: "CLIENT_ID",
			metadata: {
				batchId: "BATCH_ID",
			},
			contents: {
				type: OpGroupingManager.groupedBatchOp,
				contents: [],
			},
		};
		const messageProcessor = getMessageProcessor();
		const processResult = messageProcessor.process(
			groupedBatch as ISequencedDocumentMessage,
			() => {},
		);
		assert.deepStrictEqual(
			processResult,
			{
				type: "fullBatch",
				messages: [],
				batchStart: {
					batchStartCsn: 8,
					clientId: "CLIENT_ID",
					batchId: "BATCH_ID",
					keyMessage: groupedBatch,
				},
				groupedBatch: true,
				length: 0,
			},
			"unexpected processing of empty groupedBatch",
		);
	});
});
